agentmux_launcher\saga\log/
mod.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3//
4// LSD-1 — durable launcher saga log + API.
5//
6// Spec: `docs/specs/SPEC_LAUNCHER_SAGA_DURABILITY_2026-05-01.md`
7//   - §3.1 storage (separate SQLite file at
8//     `<data-dir>/db/launcher-sagas.db`, WAL + 5s busy timeout +
9//     foreign_keys=ON; same configuration as srv saga log. Pre-
10//     AUDIT_SQLITE_SYSTEMS_2026_05_19.md the file lived directly
11//     in `<data-dir>/`; a back-compat migration in
12//     `data_dir::launcher_saga_log_path` moves the legacy file
13//     into `db/` on first launch.)
14//   - §3.2 schema (see `schema.rs`)
15//   - §3.3 API surface (this module)
16//   - §4 PR1 scope: log exists in isolation, NO coordinator wiring
17//
18// Design parallels to `agentmux-srv/src/sagas/log.rs`. Method names
19// match where shape allows (`open`, `start_saga`, `terminate_saga`,
20// `start_step`, `finish_step`, `fail_step`, `unresolved_sagas`,
21// `mark_failed_compensation`, `snapshot_recent`, `max_saga_id`) so
22// anyone reading both feels at home (LSD spec §3.3 last paragraph).
23//
24// Differences from srv's `SagaLog` driven by LSD spec §3.2:
25//   - `target` column on the step table — launcher sagas dispatch to
26//     self / host / srv; the column carries the `PipeTarget` so
27//     `--diag sagas` can show "where did this step go?"
28//   - No `compensated` saga state — F.5/F.6 sagas don't auto-compensate
29//     (LSD spec §3.5). `failed_compensation` is the recovery-marked
30//     terminal state for unresolved sagas at startup (PR LSD-3 wires
31//     the recovery walker; this PR just defines the row).
32//   - Timestamps are RFC3339 TEXT instead of epoch-ms INTEGER. Same
33//     storage cost; greppable in raw SQLite shells. Conversion happens
34//     in `now_rfc3339()` below.
35//   - `vacuum_older_than(cutoff)` API is NEW relative to srv's log
36//     (LSD spec §3.6 retention; srv doesn't ship this yet).
37//
38// Concurrency: a single `Mutex<Connection>` serializes writes. Each
39// `start_step` / `finish_step` call holds the lock for <1ms; launcher
40// saga rate is ≤ a few per second (LSD spec §3.7 — F.5/F.6 fire on
41// rare user-initiated triggers). No connection pool needed.
42//
43// **PR LSD-1 is foundations-only.** The saga coordinator does NOT
44// call any of these methods yet; LSD-2 wires them in. The module is
45// declared on the saga tree (via `mod log;` in `saga/mod.rs`) and
46// re-exports `LauncherSagaLog` so coordinator code can pick it up
47// later without further plumbing changes.
48
49use std::path::Path;
50use std::sync::Mutex;
51
52use agentmux_common::ipc::{Command, Event};
53use chrono::{DateTime, Utc};
54use rusqlite::{params, Connection, OptionalExtension};
55use serde::{Deserialize, Serialize};
56
57use super::PipeTarget;
58
59mod schema;
60
61#[cfg(test)]
62mod tests;
63
64/// Errors from the launcher saga log. Wraps the three error sources
65/// the API can encounter: SQLite, JSON serialization, and (for the
66/// public `open(path)` constructor) underlying file IO. Distinct
67/// from srv's `StoreError` because srv's WaveStore wraps additional
68/// migration-specific variants the launcher log doesn't need.
69#[derive(Debug, thiserror::Error)]
70pub enum LogError {
71    #[error("sqlite: {0}")]
72    Sqlite(#[from] rusqlite::Error),
73    #[error("json: {0}")]
74    Json(#[from] serde_json::Error),
75    #[error("io: {0}")]
76    Io(#[from] std::io::Error),
77}
78
79/// Outcome of a launcher saga, written by `terminate_saga`.
80///
81/// PR LSD-1 declares all variants up-front so the LSD-2 coordinator
82/// wiring + LSD-3 recovery walker can land without further enum
83/// edits. `dead_code` is suppressed at the enum level because every
84/// variant is wired in a follow-up PR — opt-in suppression at the
85/// type avoids per-variant `#[allow]` clutter.
86///
87/// Mirrors srv's `SagaOutcome` shape but adds `FailedCompensation`
88/// (LSD spec §3.5 — recovery-walker terminal state) and removes
89/// `Compensated` (no auto-compensation in launcher sagas yet).
90///
91/// PR LSD-2 calls `terminate_saga(.., Completed)` from `apply_action`
92/// when a saga returns `SagaAction::Done`, and `Failed` when a saga
93/// returns `SagaAction::Failed` or is evicted by the same-kind
94/// concurrent gate. PR LSD-3 calls `mark_failed_compensation`
95/// directly from the recovery walker; that path uses the dedicated
96/// helper below rather than `terminate_saga(FailedCompensation { .. })`
97/// because recovery wants to be idempotent across repeated
98/// crash-restart cycles (the row may already be in
99/// `failed_compensation` from a prior recovery).
100///
101/// Note: launcher sagas have no `Compensated` terminal state today
102/// (per LSD spec §3.2 + §7 open question). F.5/F.6 sagas don't
103/// auto-compensate; the schema CHECK constraint on `launcher_saga.state`
104/// intentionally omits `'compensated'`. If a future class-D/E saga
105/// needs compensation, add the variant + matching CHECK constraint
106/// migration together — never one without the other.
107#[derive(Debug, Clone, PartialEq, Eq)]
108#[allow(dead_code)] // every variant wired by PR LSD-2 / LSD-3; pinned today by tests.rs
109pub enum SagaOutcome {
110    /// Saga ran to completion successfully. `SagaAction::Done` path.
111    Completed,
112    /// Saga failed with no compensation having run.
113    Failed { reason: String },
114    /// Saga was unresolved at launcher restart and the recovery walker
115    /// marked it as such. Distinct from `Failed` so operators can
116    /// filter for "interesting" cases via `--diag sagas`.
117    /// Constructed by PR LSD-3's recovery walker (via
118    /// `mark_failed_compensation`); included here for symmetry with
119    /// the schema CHECK constraint.
120    FailedCompensation { reason: String },
121}
122
123impl SagaOutcome {
124    fn state_str(&self) -> &'static str {
125        match self {
126            SagaOutcome::Completed => "completed",
127            SagaOutcome::Failed { .. } => "failed",
128            SagaOutcome::FailedCompensation { .. } => "failed_compensation",
129        }
130    }
131
132    fn reason(&self) -> Option<&str> {
133        match self {
134            SagaOutcome::Completed => None,
135            SagaOutcome::Failed { reason }
136            | SagaOutcome::FailedCompensation { reason } => Some(reason.as_str()),
137        }
138    }
139}
140
141/// Serialize a `PipeTarget` to the schema's `target` column. Mirrors
142/// srv's `command_discriminant_name` style (snake_case strings rather
143/// than Debug formatting) so `--diag sagas` output is greppable.
144fn pipe_target_str(t: PipeTarget) -> &'static str {
145    match t {
146        PipeTarget::LauncherSelf => "launcher_self",
147        PipeTarget::Host => "host",
148        PipeTarget::Srv => "srv",
149    }
150}
151
152/// A saga in `running`, `compensating`, or `failed` state at startup.
153/// Returned by `unresolved_sagas`; consumed by PR LSD-3's recovery
154/// walker to mark each as `failed_compensation` (LSD spec §3.5).
155#[derive(Debug, Clone, Serialize, Deserialize)]
156#[allow(dead_code)] // Fields consumed by PR LSD-3's recovery walker.
157pub struct UnresolvedLauncherSaga {
158    pub saga_id: u64,
159    pub name: String,
160    pub state: String,
161    pub started_at: String,
162    pub input_json: String,
163    pub failure_reason: Option<String>,
164    pub steps: Vec<UnresolvedLauncherStep>,
165}
166
167/// A step row attached to an `UnresolvedLauncherSaga`. Steps are
168/// returned in `step_index` ascending order.
169#[derive(Debug, Clone, Serialize, Deserialize)]
170#[allow(dead_code)] // Fields consumed by PR LSD-3's recovery walker.
171pub struct UnresolvedLauncherStep {
172    pub step_index: u32,
173    pub name: String,
174    pub state: String,
175    pub target: Option<String>,
176    pub cmd_json: Option<String>,
177    pub output_json: Option<String>,
178    pub started_at: String,
179    pub ended_at: Option<String>,
180    pub failure_reason: Option<String>,
181}
182
183/// Operator-facing snapshot of a recent saga, for `--diag sagas`.
184/// Returned by `snapshot_recent`. Sorted most-recent-first by
185/// `COALESCE(ended_at, started_at)`.
186#[derive(Debug, Clone, Serialize, Deserialize)]
187#[allow(dead_code)] // Fields consumed by PR LSD-3's `--diag sagas`.
188pub struct SagaSummary {
189    pub saga_id: u64,
190    pub name: String,
191    pub state: String,
192    pub started_at: String,
193    pub ended_at: Option<String>,
194    pub failure_reason: Option<String>,
195    /// Count of steps in `succeeded` or `compensated` state — i.e.
196    /// progress through the saga.
197    pub step_count: u32,
198    /// JSON of saga input args, for operator triage.
199    pub input_json: String,
200}
201
202/// SQLite-backed launcher saga log. Owned by `SagaCoordinator` as
203/// `Arc<LauncherSagaLog>` once PR LSD-2 wires it; PR LSD-1 only
204/// constructs and tests it in isolation.
205pub struct LauncherSagaLog {
206    conn: Mutex<Connection>,
207}
208
209impl LauncherSagaLog {
210    /// Open a saga log backed by the given SQLite file. Configures
211    /// WAL mode + 5s busy timeout + `foreign_keys=ON` (mirroring
212    /// `SagaLog::open` in `agentmux-srv/src/sagas/log.rs`) and
213    /// applies the schema migration from `schema.rs`.
214    ///
215    /// Idempotent: reopening the same DB applies the same DDL via
216    /// `CREATE TABLE IF NOT EXISTS` — no double-creation, no error.
217    #[allow(dead_code)] // wired in PR LSD-2 (`main.rs` opens the log on startup)
218    pub fn open(path: &Path) -> Result<Self, LogError> {
219        let conn = Connection::open(path)?;
220        Self::configure_and_migrate(conn)
221    }
222
223    /// Open the saga log in read-only mode. Used by `--diag sagas`
224    /// so an operator's diagnostic invocation can't accidentally
225    /// schema-migrate or modify a log that a running launcher owns.
226    /// Skips `configure_and_migrate` (read-only opens shouldn't run
227    /// migrations) and skips the `foreign_keys=ON` pragma since we
228    /// don't write. (codex P2 PR #647 round 3.)
229    pub fn open_read_only(path: &Path) -> Result<Self, LogError> {
230        let conn = Connection::open_with_flags(
231            path,
232            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
233                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
234        )?;
235        // Read-only PRAGMAs only: busy_timeout for WAL coexistence with
236        // a running launcher, no journal_mode change, no migrations.
237        conn.execute_batch("PRAGMA busy_timeout=5000;")?;
238        Ok(Self {
239            conn: Mutex::new(conn),
240        })
241    }
242
243    /// Open an in-memory saga log for testing. Used by `tests.rs`
244    /// and by future PR LSD-2 coordinator integration tests.
245    #[allow(dead_code)] // exercised under #[cfg(test)] only; see saga/log/tests.rs
246    pub fn open_in_memory() -> Result<Self, LogError> {
247        let conn = Connection::open_in_memory()?;
248        Self::configure_and_migrate(conn)
249    }
250
251    fn configure_and_migrate(conn: Connection) -> Result<Self, LogError> {
252        // Same pragma block as srv's `SagaLog::configure_and_migrate`
253        // (codex P2 PR #631). `foreign_keys=ON` enforces the
254        // `launcher_saga_step.saga_id REFERENCES launcher_saga(saga_id)`
255        // declaration; without it, orphan step rows can be written
256        // silently and corrupt diagnostics.
257        conn.execute_batch(
258            "PRAGMA journal_mode=WAL;
259             PRAGMA busy_timeout=5000;
260             PRAGMA synchronous=NORMAL;
261             PRAGMA temp_store=MEMORY;
262             PRAGMA foreign_keys=ON;",
263        )?;
264        schema::run_migrations(&conn)?;
265        schema::stamp_and_check_version(&conn)?;
266        Ok(Self {
267            conn: Mutex::new(conn),
268        })
269    }
270
271    /// Highest existing `saga_id` in the durable log, or 0 if empty.
272    /// PR LSD-2 calls this at coordinator startup to seed
273    /// `next_saga_id` so a launcher restart doesn't reuse ids that
274    /// already have rows in the log.
275    #[allow(dead_code)] // wired in PR LSD-2 (`SagaCoordinator::new` seed)
276    pub fn max_saga_id(&self) -> Result<u64, LogError> {
277        let conn = self.conn.lock().unwrap();
278        // Mirror srv's `max_saga_id` — propagate query errors so a
279        // transient SQLite read failure doesn't silently reseed the
280        // allocator to 0 and re-collide with prior rows (codex P2 PR
281        // #631 round 2 rationale; same hazard applies here).
282        let max: Option<i64> =
283            conn.query_row("SELECT MAX(saga_id) FROM launcher_saga", [], |r| r.get(0))?;
284        Ok(max.unwrap_or(0).max(0) as u64)
285    }
286
287    /// Insert a fresh saga row in `running` state. Plain INSERT (not
288    /// OR REPLACE): a duplicate saga_id is a bug worth surfacing,
289    /// not a silent overwrite. Same rationale as srv's `start_saga`
290    /// (codex P1 + reagent P1 PR #631).
291    #[allow(dead_code)] // wired in PR LSD-2 (`SagaCoordinator::spawn_saga`)
292    pub fn start_saga(
293        &self,
294        saga_id: u64,
295        name: &str,
296        input: &serde_json::Value,
297    ) -> Result<(), LogError> {
298        let now = now_rfc3339();
299        let input_json = serde_json::to_string(input)?;
300        let conn = self.conn.lock().unwrap();
301        conn.execute(
302            "INSERT INTO launcher_saga
303             (saga_id, name, state, started_at, ended_at, input_json, failure_reason)
304             VALUES (?1, ?2, 'running', ?3, NULL, ?4, NULL)",
305            params![saga_id as i64, name, now, input_json],
306        )?;
307        Ok(())
308    }
309
310    /// Write a saga's terminal lifecycle row. Called by PR LSD-2's
311    /// `apply_action` when the saga returns `Done` / `Failed`. The
312    /// recovery walker uses `mark_failed_compensation` instead.
313    #[allow(dead_code)] // wired in PR LSD-2
314    pub fn terminate_saga(&self, saga_id: u64, outcome: SagaOutcome) -> Result<(), LogError> {
315        let now = now_rfc3339();
316        let state = outcome.state_str();
317        let reason = outcome.reason();
318        let conn = self.conn.lock().unwrap();
319        conn.execute(
320            "UPDATE launcher_saga
321             SET state = ?1, ended_at = ?2, failure_reason = ?3
322             WHERE saga_id = ?4",
323            params![state, now, reason, saga_id as i64],
324        )?;
325        Ok(())
326    }
327
328    /// Insert a `pending` step row before dispatching the command.
329    /// `name` is a short discriminant string (e.g. "issue_cmd_host_reap_panes");
330    /// `cmd` is serialized as JSON for replay/debugging; `target`
331    /// records which pipe the command was destined for so `--diag
332    /// sagas` can show provenance.
333    #[allow(dead_code)] // wired in PR LSD-2 (`SagaCoordinator::apply_action::IssueCmd`)
334    pub fn start_step(
335        &self,
336        saga_id: u64,
337        step_index: u32,
338        name: &str,
339        target: PipeTarget,
340        cmd: &Command,
341    ) -> Result<(), LogError> {
342        let now = now_rfc3339();
343        let cmd_json = serde_json::to_string(cmd)?;
344        let target_str = pipe_target_str(target);
345        let conn = self.conn.lock().unwrap();
346        conn.execute(
347            "INSERT INTO launcher_saga_step
348             (saga_id, step_index, name, state, cmd_json, target, started_at, ended_at, output_json, failure_reason)
349             VALUES (?1, ?2, ?3, 'pending', ?4, ?5, ?6, NULL, NULL, NULL)",
350            params![
351                saga_id as i64,
352                step_index,
353                name,
354                cmd_json,
355                target_str,
356                now
357            ],
358        )?;
359        Ok(())
360    }
361
362    /// Mark a step `succeeded` and store the awaited event as JSON.
363    /// PR LSD-2 calls this from `route_event_to_sagas` when a saga's
364    /// `on_event` consumes its awaited bus event.
365    #[allow(dead_code)] // wired in PR LSD-2
366    pub fn finish_step(
367        &self,
368        saga_id: u64,
369        step_index: u32,
370        output: &Event,
371    ) -> Result<(), LogError> {
372        let now = now_rfc3339();
373        let output_json = serde_json::to_string(output)?;
374        let conn = self.conn.lock().unwrap();
375        conn.execute(
376            "UPDATE launcher_saga_step
377             SET state = 'succeeded', output_json = ?1, ended_at = ?2
378             WHERE saga_id = ?3 AND step_index = ?4",
379            params![output_json, now, saga_id as i64, step_index],
380        )?;
381        Ok(())
382    }
383
384    /// Mark a step `failed`. Stores the reason in the step's
385    /// `failure_reason` column (distinct from srv's log, which
386    /// stuffs the reason into `output_json` as `{"error": ...}`;
387    /// LSD schema gives us a dedicated column so we use it).
388    #[allow(dead_code)] // wired in PR LSD-2 (saga timeout / dispatch error path)
389    pub fn fail_step(
390        &self,
391        saga_id: u64,
392        step_index: u32,
393        reason: &str,
394    ) -> Result<(), LogError> {
395        let now = now_rfc3339();
396        let conn = self.conn.lock().unwrap();
397        conn.execute(
398            "UPDATE launcher_saga_step
399             SET state = 'failed', failure_reason = ?1, ended_at = ?2
400             WHERE saga_id = ?3 AND step_index = ?4",
401            params![reason, now, saga_id as i64, step_index],
402        )?;
403        Ok(())
404    }
405
406    /// Return all sagas still in `running`, `compensating`, or
407    /// `failed` state, each with its full step list. PR LSD-3's
408    /// startup recovery walker iterates this list and calls
409    /// `mark_failed_compensation` on each (LSD spec §3.5).
410    ///
411    /// `failed` is included for symmetry with srv's `unresolved_sagas`
412    /// (codex P1 PR #631 round 2): a launcher saga marked `failed`
413    /// without restart-time recovery would still benefit from the
414    /// `failed_compensation` upgrade so `--diag sagas` consistently
415    /// surfaces it as "needs operator attention."
416    pub fn unresolved_sagas(&self) -> Result<Vec<UnresolvedLauncherSaga>, LogError> {
417        let conn = self.conn.lock().unwrap();
418        let mut stmt = conn.prepare(
419            "SELECT saga_id, name, state, started_at, input_json, failure_reason
420             FROM launcher_saga
421             WHERE state IN ('running', 'compensating', 'failed')
422             ORDER BY saga_id ASC",
423        )?;
424        let saga_rows: Vec<(i64, String, String, String, String, Option<String>)> = stmt
425            .query_map([], |row| {
426                Ok((
427                    row.get::<_, i64>(0)?,
428                    row.get::<_, String>(1)?,
429                    row.get::<_, String>(2)?,
430                    row.get::<_, String>(3)?,
431                    row.get::<_, String>(4)?,
432                    row.get::<_, Option<String>>(5)?,
433                ))
434            })?
435            .collect::<Result<Vec<_>, _>>()?;
436        drop(stmt);
437
438        let mut out = Vec::with_capacity(saga_rows.len());
439        for (saga_id, name, state, started_at, input_json, failure_reason) in saga_rows {
440            let mut step_stmt = conn.prepare(
441                "SELECT step_index, name, state, target, cmd_json,
442                        output_json, started_at, ended_at, failure_reason
443                 FROM launcher_saga_step
444                 WHERE saga_id = ?1
445                 ORDER BY step_index ASC",
446            )?;
447            let steps: Vec<UnresolvedLauncherStep> = step_stmt
448                .query_map(params![saga_id], |row| {
449                    Ok(UnresolvedLauncherStep {
450                        step_index: row.get::<_, i64>(0)? as u32,
451                        name: row.get::<_, String>(1)?,
452                        state: row.get::<_, String>(2)?,
453                        target: row.get::<_, Option<String>>(3)?,
454                        cmd_json: row.get::<_, Option<String>>(4)?,
455                        output_json: row.get::<_, Option<String>>(5)?,
456                        started_at: row.get::<_, String>(6)?,
457                        ended_at: row.get::<_, Option<String>>(7)?,
458                        failure_reason: row.get::<_, Option<String>>(8)?,
459                    })
460                })?
461                .collect::<Result<Vec<_>, _>>()?;
462            out.push(UnresolvedLauncherSaga {
463                saga_id: saga_id as u64,
464                name,
465                state,
466                started_at,
467                input_json,
468                failure_reason,
469                steps,
470            });
471        }
472        Ok(out)
473    }
474
475    /// Fetch the step rows for a single saga regardless of saga state.
476    /// `unresolved_sagas` filters out `failed_compensation` (and other
477    /// terminal states), but `--diag sagas` needs to surface step rows
478    /// for sagas the recovery walker just marked `failed_compensation`
479    /// — operators triaging a recovered crash need to see what was
480    /// pending when the launcher exited. (codex P1 PR #647 round 1.)
481    pub fn get_saga_steps(&self, saga_id: u64) -> Result<Vec<UnresolvedLauncherStep>, LogError> {
482        let conn = self.conn.lock().unwrap();
483        let mut stmt = conn.prepare(
484            "SELECT step_index, name, state, target, cmd_json,
485                    output_json, started_at, ended_at, failure_reason
486             FROM launcher_saga_step
487             WHERE saga_id = ?1
488             ORDER BY step_index ASC",
489        )?;
490        let steps: Vec<UnresolvedLauncherStep> = stmt
491            .query_map(params![saga_id as i64], |row| {
492                Ok(UnresolvedLauncherStep {
493                    step_index: row.get::<_, i64>(0)? as u32,
494                    name: row.get::<_, String>(1)?,
495                    state: row.get::<_, String>(2)?,
496                    target: row.get::<_, Option<String>>(3)?,
497                    cmd_json: row.get::<_, Option<String>>(4)?,
498                    output_json: row.get::<_, Option<String>>(5)?,
499                    started_at: row.get::<_, String>(6)?,
500                    ended_at: row.get::<_, Option<String>>(7)?,
501                    failure_reason: row.get::<_, Option<String>>(8)?,
502                })
503            })?
504            .collect::<Result<Vec<_>, _>>()?;
505        Ok(steps)
506    }
507
508    /// Mark a saga as `failed_compensation` — the recovery walker's
509    /// terminal write. Idempotent across repeated calls (the saga
510    /// stays in `failed_compensation`; `ended_at` is rewritten to
511    /// the latest call's timestamp; `failure_reason` is preserved
512    /// when already populated and the new reason is APPENDED rather
513    /// than overwritten — see SQL CASE WHEN below). This preserves
514    /// the original failure cause (e.g. timeout) while adding the
515    /// recovery context. (codex P2 PR #647 round 1: post-mortem
516    /// preservation.) LSD spec §3.5 — operator-review terminal state.
517    pub fn mark_failed_compensation(
518        &self,
519        saga_id: u64,
520        reason: &str,
521    ) -> Result<(), LogError> {
522        let now = now_rfc3339();
523        let conn = self.conn.lock().unwrap();
524        // Preserve original failure_reason when already populated.
525        // A saga in `failed` state pre-crash carries the precise
526        // original cause (timeout, dispatch error, etc.) that
527        // operators need for post-mortem. Recovery transitions
528        // state to `failed_compensation` but augments rather than
529        // replaces failure_reason — appends the restart context
530        // so both signals are visible in `--diag sagas`.
531        // (codex P2 PR #647 round 1.)
532        conn.execute(
533            "UPDATE launcher_saga
534             SET state = 'failed_compensation',
535                 ended_at = ?1,
536                 failure_reason = CASE
537                     WHEN failure_reason IS NULL OR failure_reason = ''
538                       THEN ?2
539                     ELSE failure_reason || ' | recovered: ' || ?2
540                 END
541             WHERE saga_id = ?3",
542            params![now, reason, saga_id as i64],
543        )?;
544        Ok(())
545    }
546
547    /// Return up to `limit` recent sagas for `--diag sagas`. Sorted
548    /// most-recent-first by `COALESCE(ended_at, started_at)`. Mirrors
549    /// srv's `snapshot_recent` shape.
550    pub fn snapshot_recent(&self, limit: usize) -> Result<Vec<SagaSummary>, LogError> {
551        let conn = self.conn.lock().unwrap();
552        let mut stmt = conn.prepare(
553            "SELECT saga_id, name, state, started_at, ended_at, failure_reason, input_json
554             FROM launcher_saga
555             ORDER BY COALESCE(ended_at, started_at) DESC, saga_id DESC
556             LIMIT ?1",
557        )?;
558        let rows: Vec<(
559            i64,
560            String,
561            String,
562            String,
563            Option<String>,
564            Option<String>,
565            String,
566        )> = stmt
567            .query_map(params![limit as i64], |row| {
568                Ok((
569                    row.get::<_, i64>(0)?,
570                    row.get::<_, String>(1)?,
571                    row.get::<_, String>(2)?,
572                    row.get::<_, String>(3)?,
573                    row.get::<_, Option<String>>(4)?,
574                    row.get::<_, Option<String>>(5)?,
575                    row.get::<_, String>(6)?,
576                ))
577            })?
578            .collect::<Result<Vec<_>, _>>()?;
579        drop(stmt);
580
581        let mut out = Vec::with_capacity(rows.len());
582        for (saga_id, name, state, started_at, ended_at, failure_reason, input_json) in rows {
583            let count: Option<i64> = conn
584                .query_row(
585                    "SELECT COUNT(*) FROM launcher_saga_step
586                     WHERE saga_id = ?1 AND state IN ('succeeded', 'compensated')",
587                    params![saga_id],
588                    |row| row.get(0),
589                )
590                .optional()?;
591            out.push(SagaSummary {
592                saga_id: saga_id as u64,
593                name,
594                state,
595                started_at,
596                ended_at,
597                failure_reason,
598                step_count: count.unwrap_or(0) as u32,
599                input_json,
600            });
601        }
602        Ok(out)
603    }
604
605    /// Delete saga rows whose `ended_at` is before `cutoff` AND whose
606    /// state is terminal (`completed`, `failed`, `failed_compensation`).
607    /// Returns the number of rows deleted. In-flight sagas (`running`,
608    /// `compensating`) are NEVER vacuumed — that would mask
609    /// crashed-mid-saga incidents from the recovery walker (LSD spec §3.6).
610    ///
611    /// `ON DELETE CASCADE` on `launcher_saga_step.saga_id` ensures
612    /// the corresponding step rows go with the saga in a single
613    /// SQLite transaction — no manual cleanup needed.
614    // LSD-4: wired by `main.rs::run_windows` startup retention task.
615    pub fn vacuum_older_than(&self, cutoff: DateTime<Utc>) -> Result<usize, LogError> {
616        let cutoff_str = cutoff.to_rfc3339();
617        let conn = self.conn.lock().unwrap();
618        let removed = conn.execute(
619            "DELETE FROM launcher_saga
620             WHERE state IN ('completed', 'failed', 'failed_compensation')
621               AND ended_at IS NOT NULL
622               AND ended_at < ?1",
623            params![cutoff_str],
624        )?;
625        Ok(removed)
626    }
627}
628
629/// RFC3339 timestamp for `started_at` / `ended_at` columns. Single
630/// helper so test+production paths agree on format precisely.
631fn now_rfc3339() -> String {
632    Utc::now().to_rfc3339()
633}